package com.shellyan.gmall.rt.app

import com.alibaba.fastjson.JSON
import com.atguigu.realtime.gmall.common.Constant
import com.shellyan.gmall.rt.bean.OrderInfo
import org.apache.spark.streaming.dstream.DStream
import org.json4s.CustomKeySerializer
import org.json4s.JsonAST.{JDouble, JString}
import org.json4s.jackson.JsonMethods

/**
 * @author Shelly An
 * @create 2020/9/7 9:25
 */
object OrderApp extends BaseApp {
  override val master: String = "local[2]"
  override val appName: String = "OrderApp"
  override val batchTime: Int = 3
  override val topics: Set[String] = Set(Constant.ORDER_INFO_TOPIC)
  override val groupId: String = "OrderApp"

  /**
   * ali的json解析会有bug的话，可以尝试用自带的scala的解析方式
   * 在scala中更专业
   */
//  object StringToDouble extends CustomKeySerializer[Double](format => {
//    {
//      case JString(d) => d.toDouble
//    }
//    {
//      case d: Double => JDouble(d)
//    }
//  })


  override def run(sourceStream: DStream[String]): Unit = {
    sourceStream.map(json => JSON.parseObject(json, classOf[OrderInfo]))
      .foreachRDD(rdd => {
        import org.apache.phoenix.spark._
        rdd.saveToPhoenix("gmall_order_info"
          ,
          Seq("ID", "PROVINCE_ID", "CONSIGNEE", "ORDER_COMMENT", "CONSIGNEE_TEL", "ORDER_STATUS", "PAYMENT_WAY",
            "USER_ID", "IMG_URL", "TOTAL_AMOUNT", "EXPIRE_TIME", "DELIVERY_ADDRESS", "CREATE_TIME", "OPERATE_TIME",
            "TRACKING_NO", "PARENT_ORDER_ID", "OUT_TRADE_NO", "TRADE_BODY", "CREATE_DATE", "CREATE_HOUR")
          ,
          zkUrl = Some("hadoop102,hadoop103,hadoop104:2181")
        )
      })
  }
}
